package top.hypnos.bigdata.zookeeper.subcriber;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/*************************************************
 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
 *  注释： 发布者程序
 *  实现思路： Publisher程序只负责发布消息
 *  -
 *  发布者的逻辑：
 *  发布一个消息到 ZK 中。把消息存储在 ZK 中的一个 znode 上
 *  订阅者，因为订阅了 znode 节点的父节点的 NodeChildrenChanged 事件，所以 订阅者能收到通知。
 *  收到了通知，再去查询这个 znode 上的数据，就知道了发布的 消息到底是什么了。
 **/
public class Publisher {

    // zookeeper服务器地址
    private static final String CONNECT_INFO = "zs-test-ubuntu-01:2181,zs-test-ubuntu-02:2181,zs-test-ubuntu-03:2181";
    private static final int TIME_OUT = 4000;

    private static final String PARENT_NODE = "/popeye";
    private static final String SUB_NODE_NAME = "publish_info";
    private static final String SUB_NODE = PARENT_NODE + "/" + SUB_NODE_NAME;

    private static final String PUBLISH_INFO = "bigdata9876,9527,com.mazh.nx.Service9876,getName,xuzheng";

    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    // 会话对象
    private static ZooKeeper zookeeper = null;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        zookeeper = new ZooKeeper(CONNECT_INFO, TIME_OUT, event -> {
            // 确保链接建立
            if (countDownLatch.getCount() > 0 && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("创建会话链接成功");

                // 判断父节点是否存在
                try {
                    if (zookeeper.exists(PARENT_NODE, false) == null) {
                        zookeeper.create(PARENT_NODE, PARENT_NODE.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }

                countDownLatch.countDown();
            }

            // 发布者不用干点啥
        });

        countDownLatch.await();

        final var children = zookeeper.getChildren(PARENT_NODE, false);
        final int index;
        if (children.isEmpty()) {
            index = 1;
        } else {
            children.sort(String::compareTo);
            final var max = children.get(children.size() - 1);
            index = Integer.parseInt(max.replace(SUB_NODE_NAME, "")) + 1;
        }
        final var suffix = String.format("%03d", index);
        final var s = zookeeper.create(SUB_NODE + suffix, PUBLISH_INFO.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(s);

        zookeeper.close();
    }
}
